package com.bst.etl.service.impl;

import com.bst.common.constant.JobConstant;
import com.bst.common.core.domain.AjaxResult;
import com.bst.common.utils.DateUtil;
import com.bst.common.vo.DataExecParamVO;
import com.bst.common.vo.JobExecParamVO;
import com.bst.etl.batch.BatchExecService;
import com.bst.etl.core.masterworker.Master;
import com.bst.etl.core.masterworker.Worker;
import com.bst.etl.domain.EtlJobDrawbase;
import com.bst.etl.domain.EtlJobIndex;
import com.bst.etl.domain.EtlTaskExec;
import com.bst.etl.service.*;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.text.ParseException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Service
public class EtlJobRunServiceImpl implements IEtlJobRunService {

    @Autowired
    IEtlTaskExecService taskExecService;
    @Autowired
    IEtlDrawBaseService drawBaseService;
    @Autowired
    IEtlDrawDwService drawDwService;
    @Autowired
    IEtlTaskExecItemService execItemService;
    @Autowired
    IEtlJobDrawbaseService etlJobDrawbaseService;
    @Autowired
    BatchExecService batchExecService;
    @Autowired
    IEtlBaseVariService etlBaseVariService;
    @Autowired
    IEtlJobIndexService etlJobIndexService;

    @Override
    public AjaxResult run(JobExecParamVO jobExecParamVO) throws ParseException {
        Date startDate = DateUtil.FORMAT_YYYYMMDD.parse(jobExecParamVO.getStartDay());
        Date endDate = DateUtil.FORMAT_YYYYMMDD.parse(jobExecParamVO.getEndDay());
        String type = jobExecParamVO.getType();
        String idTasks = jobExecParamVO.getIdTasks();
        Integer execType = jobExecParamVO.getExecType()==null?1:jobExecParamVO.getExecType();
        EtlTaskExec taskExec = new EtlTaskExec();
        taskExec.setDtBegin(new Date());
        taskExec.setEuStatus("STARTING");
        taskExec.setScope(jobExecParamVO.getStartDay()+"~"+jobExecParamVO.getEndDay());
        taskExec.setSdEtlcls(type);
        taskExec.setIdJoblog(jobExecParamVO.getJobLogId());
        taskExec.setNa(type+"["+taskExec.getScope()+"]");
        taskExec.setEuTp(jobExecParamVO.getExecType());
        taskExecService.insertEtlTaskExec(taskExec);
        Long idExec = taskExec.getIdExec();
        List<String> ids = Arrays.asList(idTasks.split(","));
        List<DataExecParamVO> list=null;
        String status="NOT EXEC";
        if(JobConstant.JOB_TYPE_BASE.equals(type)) {
            list = drawBaseService.findBaseTaskByIds(ids,startDate,endDate);
            sqlVariHandler(list,null);
            status = runBatchJobList(list,idExec,execType);
        } else if(JobConstant.JOB_TYPE_DW.equals(type)) {
            list = drawDwService.findParamVO(ids,startDate,endDate);
            sqlVariHandler(list,null);
            status = runBatchJobList(list,idExec,execType);
        } else if(JobConstant.JOB_TYPE_ALL.equals(type)) {
            for(String jobId:ids) {
                /*if(isJobCheck(Long.parseLong(jobId),jobExecParamVO.getEndDay())) {
                    taskExecService.deleteEtlTaskExecByIdExec(idExec);
                    return AjaxResult.error("存在已审核指标，数据不可重抽");
                }*/
                EtlJobDrawbase jobDrawbase = new EtlJobDrawbase();
                jobDrawbase.setIdJob(Long.parseLong(jobId));
                List<EtlJobDrawbase> jobDrawbaseList = etlJobDrawbaseService.selectEtlJobDrawbaseList(jobDrawbase);
                List<String> baseIds = new ArrayList<>();
                for(EtlJobDrawbase db:jobDrawbaseList) {
                    baseIds.add(db.getIdDrawBase()+"");
                }
                list = drawBaseService.findBaseTaskByIds(baseIds,startDate,endDate);
                sqlVariHandler(list,Long.parseLong(jobId));
                if(list.size()>0) {
                    status = runBatchJobList(list,idExec,execType);
                    list = drawDwService.findParamVOByJobId(Long.parseLong(jobId),startDate,endDate);
                    if(list.size()>0) {
                        status = runBatchJobList(list,idExec,execType);
                    }
                }
                status = runIndexs(list,idExec,execType);
            }
        }
        taskExec.setDtEnd(new Date());
        taskExec.setEuStatus(status);
        taskExec.setTimes((taskExec.getDtEnd().getTime()-taskExec.getDtBegin().getTime())/1000);
        taskExecService.updateEtlTaskExec(taskExec);
        return AjaxResult.success(BatchStatus.COMPLETED.name().equals(status)?1:0);
    }

    /**
     * sql变量替换
     * @return
     */
    private void sqlVariHandler(List<DataExecParamVO> list, Long jobId) {
        Map<String,String> vari=etlBaseVariService.queryAllVariByIdJob(jobId);
        String delSql,qrySql;
        for(DataExecParamVO paramVO:list) {
            delSql = paramVO.getDelSql();
            qrySql = paramVO.getQuerySql();
            for(Map.Entry<String,String> entry:vari.entrySet()) {
                qrySql = qrySql.replaceAll("\\$\\{"+entry.getKey()+"\\}",entry.getValue());
                delSql = delSql.replaceAll("\\$\\{"+entry.getKey()+"\\}",entry.getValue());
            }
            paramVO.setQuerySql(qrySql);
            paramVO.setDelSql(delSql);
        }
    }

    private String runIndexs(List<DataExecParamVO> params, Long idExec, Integer exeType) {
        if (params == null || params.size() == 0) return BatchStatus.COMPLETED.name();
        ConcurrentHashMap<Long, JobExecution> resultMap = new ConcurrentHashMap<>();
        for (DataExecParamVO param : params) {
            param.setIdExec(idExec);
            param.setExeType(exeType);
            batchExecService.runJob(getJobParameters(param),resultMap);
        }
        for(JobExecution r : resultMap.values()) {
            if(!BatchStatus.COMPLETED.name().equals(r.getStatus().name())) {
                return BatchStatus.FAILED.name();
            }
        }
        return BatchStatus.COMPLETED.name();
    }

    private String runBatchJobList(List<DataExecParamVO> params, Long idExec, Integer exeType) {
        if (params == null || params.size() == 0) return BatchStatus.COMPLETED.name();
        Master master = new Master(new Worker(batchExecService), 4);
        for (DataExecParamVO param : params) {
            param.setIdExec(idExec);
            param.setExeType(exeType);
            master.submit(getJobParameters(param));
        }
        master.execute();
        while (true) {
            if(master.isCompleted()) {
                return master.getResult();
            }
        }

    }

    private JobParameters getJobParameters(DataExecParamVO param) {
        try {
            JobParametersBuilder jobParametersBuilder = new JobParametersBuilder()
                    .addLong("time", System.currentTimeMillis())
                    .addLong(JobConstant.JOB_ID_EXEC,param.getIdExec())
                    .addDate(JobConstant.JOB_DT_DAY_BEGIN,param.getStartDate())
                    .addDate(JobConstant.JOB_DT_DAY_END,param.getEndDate())
                    .addString(JobConstant.JOB_DES, param.getDes())
                    .addString(JobConstant.JOB_DELSQL, param.getDelSql())
                    .addString(JobConstant.JOB_QUERYSQL, param.getQuerySql())
                    .addString(JobConstant.JOB_TABLENAME, param.getTbTar())
                    .addString(JobConstant.JOB_TYPE, param.getJobType())
                    .addString(JobConstant.JOB_DB_SOU, param.getIdDbSou())
                    .addString(JobConstant.JOB_DB_TAR, param.getIdDbTar())
                    .addLong(JobConstant.JOB_EXE_TYPE, param.getExeType().longValue());
            return jobParametersBuilder.toJobParameters();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

}
